package com.offcn.bigdata.spark.sql.p1

import com.offcn.bigdata.spark.domain.Student
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

import scala.collection.JavaConversions

/**
  * SparkSQL编程模型的构建
  *     DataFrame和DataSet
  *  1. 基于反射的方式构建
  *  2. 动态编程的方式进行构建
  */
object _02SparkSQLModel2DataFrameOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            //使用kryo的序列化方式
                    .set("spark.serializer", classOf[KryoSerializer].getName)
                    .registerKryoClasses(Array(classOf[Student]))
        val spark = SparkSession.builder()
                        .appName("_02SparkSQLModelOps")
                        .master("local[*]")
                        .config(conf)
                        .getOrCreate()

        val rowRDD:RDD[Row] = spark.sparkContext.parallelize(List(
            new Student("车传广", 23, "男", "辽宁"),
            new Student("闫逾恒", 22, "男", "河北"),
            new Student("刘博", 20, "男", "天津"),
            new Student("王鑫达", 23, "男", "浙江"),
            new Student("田志", 23, "男", "河南")
        )).map(stu => {
            Row(stu.getName, stu.getAge, stu.getGender, stu.getProvince)
        })

        val schema = StructType(
            Array(
                StructField("name", DataTypes.StringType, true),
                StructField("age", DataTypes.IntegerType, true),
                StructField("gender", DataTypes.StringType, true),
                StructField("province", DataTypes.StringType, true)
            )
        )
        val df = spark.createDataFrame(rowRDD, schema)

        df.printSchema()

        df.show()
        spark.stop()
    }

    def createDataFrameByRelection(spark: SparkSession): Unit = {
        //基于反射的方式创建dataframe
        val list = List(
            new Student("车传广", 23, "男", "辽宁"),
            new Student("闫逾恒", 22, "男", "河北"),
            new Student("刘博", 20, "男", "天津"),
            new Student("王鑫达", 23, "男", "浙江"),
            new Student("田志", 23, "男", "河南")
        )
        val javaList = JavaConversions.seqAsJavaList(list)

        val sdf = spark.createDataFrame(javaList, classOf[Student])
        sdf.printSchema()
        sdf.show()
    }
}

//case class Student(val name: String, val age: Int, val gender: String, val province: String)
